-
Notifications
You must be signed in to change notification settings - Fork 166
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Partition Evolution #245
Partition Evolution #245
Conversation
There's still a lot of cleanup required, and need to add unit tests, also some bugs I'm encountering. But I'm putting up this draft since the core pieces are here. |
b7e0713
to
c80f5f7
Compare
21dd373
to
b29f5f9
Compare
53349ad
to
ac50b33
Compare
ac50b33
to
9d3e9f1
Compare
if not source_name: | ||
raise ValueError(f"Could not find column with id {field.source_id}") | ||
|
||
transform = field.transform |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice to have: Do we also want to have a single dispatch to map these types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you create an issue for this?
f2d34f6
to
8ef11f0
Compare
pyiceberg/table/__init__.py
Outdated
def last_partition_id(self) -> Optional[int]: | ||
"""Return the highest assigned partition field ID across all specs for the table or None if the table is unpartitioned and there are no specs.""" | ||
if len(self.specs()) == 1 and self.spec().is_unpartitioned(): | ||
return None | ||
return self.metadata.last_partition_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we probably should update
iceberg-python/pyiceberg/partitioning.py
Lines 148 to 152 in 7fbcc22
@property | |
def last_assigned_field_id(self) -> int: | |
if self.fields: | |
return max(pf.field_id for pf in self.fields) | |
return PARTITION_FIELD_ID_START |
to return PARTITION_FIELD_ID_START - 1 for unpartitioned spec. Then we can return the last_partition_id from metadata directly because the metadata should have last_partition_id=999
for unpartitioned table.
Java implementation uses PARTITION_FIELD_ID_START - 1 for unpartitioned spec:
https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L344-L345
https://github.com/apache/iceberg/blob/9921937d8285dec9a19fd16b0cd82d451a8aca9e/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L319-L321
I checked locally that unpartitioned tables created by spark-iceberg-runtime have last_partition_id=999
, while those created by pyiceberg have last_partition_id=1000
:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about that but wasn't sure about breaking API behavior which is why I added a new API. If we have the flexibility here to change the API we should do that. I think we can because arguably it's incorrect to return 1000 for an unpartitioned table so it's really a fix.
8ef11f0
to
edf521a
Compare
update.remove_field("day_ts").remove_field("bucketed_id") | ||
with table_v2.update_spec() as update: | ||
update.add_field("str", TruncateTransform(2), "truncated_str") | ||
_validate_new_partition_fields(table_v2, 1002, 2, PartitionField(3, 1002, TruncateTransform(2), "truncated_str")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test shows why assigning new field IDs based on the last field ID across all specs is important to avoid collisions. In this case if we just used the last spec, after the remove_field is done for the original partitions, the latest spec would just be the unpartitioned spec. Then when we go and add the new truncated_str partitioned field, we would create a field ID of 1000 which is not what we want (it'll collide with the original 1000 field ID of the bcuket transform on ID)
12fd0f0
to
351ec9b
Compare
615bd93
to
7fbcc22
Compare
pyiceberg/table/__init__.py
Outdated
def last_partition_id(self) -> Optional[int]: | ||
"""Return the highest assigned partition field ID across all specs for the table or None if the table is unpartitioned and there are no specs.""" | ||
if len(self.specs()) == 1 and self.spec().is_unpartitioned(): | ||
return None | ||
return self.metadata.last_partition_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we probably should update
iceberg-python/pyiceberg/partitioning.py
Lines 148 to 152 in 7fbcc22
@property | |
def last_assigned_field_id(self) -> int: | |
if self.fields: | |
return max(pf.field_id for pf in self.fields) | |
return PARTITION_FIELD_ID_START |
to return PARTITION_FIELD_ID_START - 1 for unpartitioned spec. Then we can return the last_partition_id from metadata directly because the metadata should have last_partition_id=999
for unpartitioned table.
Java implementation uses PARTITION_FIELD_ID_START - 1 for unpartitioned spec:
https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L344-L345
https://github.com/apache/iceberg/blob/9921937d8285dec9a19fd16b0cd82d451a8aca9e/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L319-L321
I checked locally that unpartitioned tables created by spark-iceberg-runtime have last_partition_id=999
, while those created by pyiceberg have last_partition_id=1000
:
89e3bcb
to
3e7e183
Compare
@@ -308,7 +308,8 @@ def construct_partition_specs(cls, data: Dict[str, Any]) -> Dict[str, Any]: | |||
data[PARTITION_SPECS] = [{"field-id": 0, "fields": ()}] | |||
|
|||
data[LAST_PARTITION_ID] = max( | |||
[field.get(FIELD_ID) for spec in data[PARTITION_SPECS] for field in spec[FIELDS]], default=PARTITION_FIELD_ID_START | |||
[field.get(FIELD_ID) for spec in data[PARTITION_SPECS] for field in spec[FIELDS]], | |||
default=PARTITION_FIELD_ID_START - 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be updated so that in case there are partition specs, we return 999. It's insufficient to just update the PartitionSpec#last_assigned_field_id
method. I do believe this is spec compliant since the spec doesn't explicitly say what values these IDs should be. This is also what Spark does when one creates an unpartitioned table. The spec does say in v1, ids were assigned starting at 1000, which is still followed. So I think we're covered. @Fokko @HonahX
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. So the next one will be 1000. It feels a bit like a workaround, let me check
@@ -277,7 +277,7 @@ def test_create_table(table_schema_simple: Schema, hive_database: HiveDatabase, | |||
) | |||
], | |||
current_schema_id=0, | |||
last_partition_id=1000, | |||
last_partition_id=999, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this one is a bit odd since it is V2 table. I had to dig into the code myself a bit as well. I noticed that the last_partition_id
is optional in the metadata. What do you think of the following solution: amogh-jahagirdar#1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked V2 unpartitioned table created by spark-iceberg-runtime and the last_partition_id
stored in the metadata is 999
.
Therefore I suggested to update the last_partition_id()
in pyiceberg to align with the java implementation.
In general, I think 999
is spec compliant since it is for UnpartitionedSpec
, where there is no existing partition field. It implies that 1000 will be the id for the first valid partition field and thus align with the spec. Do these sound reasonable? Appreciate your thoughts on this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko I took a look and integrated the changes, and after going back/forth I think I'd like to keep the changes as is.
My rationale is that even after those changes there's some more workarounds that need to be done to make sure new IDs start at 1000 (after taking the changes directly, field IDs for non-REST will start at 1001 without any more changes).
Now, technically it does not seem to be a hard requirement that ids need to start at 1000 for v2 tables. Even for v1 starting at 1000 does not seem to be a requirement, rather just how the Java lib was implemented.
In v1, partition field IDs were not tracked, but were assigned sequentially starting at 1000 in the reference implementation.
I read this as "this was how we originally implemented in Java but not really required so long as it's unique and increasing for new fields"
All that said, I'd advocate for just following the practice of starting at 1000 for both v1 and v2 because it's just the established model and avoids any confusion.
On returning 999 for unpartitioned spec:
As @HonahX alluded to I think that makes for a logical API, considering we want to start a field at 1000, the unpartitioned spec (no fields) last assigned field ID should be one less than that. I don't think we want to return 1000 in that case. This is also what Spark sets for the unpartitioned spec, and is spec compliant (since the spec doesn't mandate any particular IDs)
I could see the argument where we just want to return None for a last_assigned_partition_id for this API but that just shifts the responsibility to other locations to make sure Ids are set correctly according to our scheme which ends up being more messy imo compared to just defining the API to return 999 if the only spec is the unpartitioned spec (which is the value which would be set anyways).
I also think it makes sense to set last-assigned-partition-id for both V1 and V2. Even though it's optional for V1 we set the other optional fields for v1 metadata so it seems a bit odd to make this particular case an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable to me, and as long as it is in line with the spec, I'm okay with it 👍
3e7e183
to
3ee9f4a
Compare
3ee9f4a
to
499e36e
Compare
@@ -277,7 +277,7 @@ def test_create_table(table_schema_simple: Schema, hive_database: HiveDatabase, | |||
) | |||
], | |||
current_schema_id=0, | |||
last_partition_id=1000, | |||
last_partition_id=999, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fokko I took a look and integrated the changes, and after going back/forth I think I'd like to keep the changes as is.
My rationale is that even after those changes there's some more workarounds that need to be done to make sure new IDs start at 1000 (after taking the changes directly, field IDs for non-REST will start at 1001 without any more changes).
Now, technically it does not seem to be a hard requirement that ids need to start at 1000 for v2 tables. Even for v1 starting at 1000 does not seem to be a requirement, rather just how the Java lib was implemented.
In v1, partition field IDs were not tracked, but were assigned sequentially starting at 1000 in the reference implementation.
I read this as "this was how we originally implemented in Java but not really required so long as it's unique and increasing for new fields"
All that said, I'd advocate for just following the practice of starting at 1000 for both v1 and v2 because it's just the established model and avoids any confusion.
On returning 999 for unpartitioned spec:
As @HonahX alluded to I think that makes for a logical API, considering we want to start a field at 1000, the unpartitioned spec (no fields) last assigned field ID should be one less than that. I don't think we want to return 1000 in that case. This is also what Spark sets for the unpartitioned spec, and is spec compliant (since the spec doesn't mandate any particular IDs)
I could see the argument where we just want to return None for a last_assigned_partition_id for this API but that just shifts the responsibility to other locations to make sure Ids are set correctly according to our scheme which ends up being more messy imo compared to just defining the API to return 999 if the only spec is the unpartitioned spec (which is the value which would be set anyways).
I also think it makes sense to set last-assigned-partition-id for both V1 and V2. Even though it's optional for V1 we set the other optional fields for v1 metadata so it seems a bit odd to make this particular case an exception.
|
||
|
||
@pytest.mark.integration | ||
@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've parameterized all the tests for testing both hive/rest.
Thanks @amogh-jahagirdar for working on this, and sorry for the long wait for the review. Thanks @HonahX for the review 🙌 |
Fixes #193